---
title: API StreamFunction
---

import { Callout } from 'nextra-theme-docs'


# API StreamFunction

StreamFunction is a stateful serverless function that handle chunked data from [Zipper][zipper], and return a chunked data to [Zipper][zipper].

## func yomo.NewStreamFunction

```go
yomo.NewStreamFunction(name, zipperAddr string, opts ...SfnOption) StreamFunction
```

Create a stream function instance.

- `name`: The name of the stream function.
- `zipperAddr`: The endpoint of the [Zipper][zipper] to connect to.
- `opts`: The [SfnOption][sfnOption] when create the stream function.

example:

```go
sfn := yomo.NewStreamFunction(
  "stream-llm-inf-response", 
  "localhost:9000",
  yomo.WithCredential("token:123456abcdefg"),
)
```

## type StreamFunction

### sfn.SetObserveDataTags(tags ...Tag)

Set the data [Tag][tag] list that will be observed from [Zipper][zipper].

- `tags`: The data [Tag][tag] list.

### sfn.Init(fn)

<Callout emoji="ℹ️" type="info">
This feature is introduced in version 1.14
</Callout>

Set the fn as `initializer` for this serverless, It's **Optional**, if setted, the `fn` will be invoked only once at the first time this serverless start.

- `fn`: The initializer handler with `func () error` signature.

```go {8-10}
sfn := NewStreamFunction(
	"test-sfn",
	"localhost:9000",
)
defer sfn.Close()

// Init fn, load the 7b model to GPU momery when this serverless start.
err := sfn.Init(func() error {
  return LoadModelToMemory('llama-2-7b-chat', 'tokenizer.model')
})

// handle every data to be predicted
sfn.SetHandler(...)

sfn.Connect()
```

### sfn.SetHandler(fn AsyncHandler) error

Set the handler function in [async mode](#type-asynchandler), which accept the raw bytes data from [Zipper][zipper], and return the raw bytes data to [Zipper][zipper].

- `fn`: The handler function of [AsyncHandler](#type-asynchandler).

```go {9-12}
sfn := yomo.NewStreamFunction(
  "my-fn",
  "localhost:9000",
)
defer sfn.Close()

sfn.SetObserveDataTags(0x10)

sfn.SetHandler(func (ctx serverless.Context) {
  data := ctx.Data()
  log.Printf("✅ [my-fn] received <- %v", string(data))
})

err = sfn.Connect()
```

### sfn.SetPipeHandler(fn PipeHandler) error

Set the handler function in [blocking mode](#type-pipehandler), which accept the raw bytes data from [Zipper][zipper], and return the raw bytes data to [Zipper][zipper].

- `fn`: The handler function of [PipeHandler](#type-pipehandler).

### sfn.SetErrorHandler(fn func(err error))

Set the error handler function when server error occurs.

- `fn`: The error handler function.
    - `err`: The error.

### sfn.Connect() error

Create a connection to [Zipper][zipper], when data is received, the handler function will be called.

### sfn.Write(tag Tag, data []byte) error

Write data to [Zipper][zipper].

- `tag`: The data [Tag][tag].
- `data`: The raw bytes data to be wrote.

### sfn.Close() error

Close the connection to [Zipper][zipper].

### sfn.Wait()

Wait until the function fulfilled.

## type SfnOption

### func WithObserveDataTags(tags ...Tag) SfnOption

Set data tag list which observed by this stream function.

- `tags`: The [Tag][tag] list.

### func WithCredential(token string) SfnOption

Set the credential method when this Stream Function instance connect to [Zipper][zipper].

- `token`: The token string.

### func WithClientTLSConfig(tc *tls.Config) SfnOption

Set TLS config for this Stream Function instance.

- `tc`: The TLS config.

### func WithClientQuicConfig(qc *quic.Config) SfnOption

Set QUIC config for this Stream Function instance.

- `qc`: The [QUIC config](https://pkg.go.dev/github.com/lucas-clemente/quic-go#Config).

### WithLogger(logger *slog.Logger) SfnOption

Set the logger for this Source instance.

- `logger`: The logger.

### WithSfnTracerProvider(tp *tracesdk.TracerProvider) SfnOption

Set the tracer provider for this Source instance.

- `tp`: The tracer provider.

```go
tp, shutdown, err := trace.NewTracerProvider("yomo-sfn")
if err == nil {
	log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())

// stateful function
sfn := yomo.NewStreamFunction(
	"sfn",
	"localhost:9000",
	yomo.WithSfnTracerProvider(tp),  // use tracer provider
)
```

More about [YoMo Observability](/docs/opentracing).

## type AsyncHandler

```go
type AsyncHandler func(reqData []byte) (respTag Tag, respData []byte)
```

The request-response mode handler function, async mode.

- `reqData`: The raw bytes data received from [Zipper][zipper].

Returns:

- `respTag`: The [Tag][tag] of the response data.
- `respData`: The raw bytes data to be wrote to [Zipper][zipper].

AsyncHandler is used to handle high concurrent requests, and the response data will be sent to [Zipper][zipper] after the handler function returns.

## type PipeHandler

```go
type PipeHandler func(in <-chan []byte, out chan<- *PayloadFrame)
```

The blocking mode handler function.

- `in`: The input channel of the raw bytes data received from [Zipper][zipper].
- `out`: The output channel of the PayloadFrame to be wrote to [Zipper][zipper].

PipeHandler is used to handle chunked stream data, like video stream, audio stream, behavior sequence data, etc. Ingress data will be guarantee the order, and the egress data will be guarantee the order too. By this, developers can read video stream data continuously, then handle the frames by an AI model, and write the inference result back to user instantly.

[sfnOption]: #type-sfnoption
[zipper]: ../cli/zipper
[tag]: ./tag
